Naive Bayes Classifier [Spam Filtering]

Problem Statement
The input data is a set of SMS messages that has been classified
as either “ham” or “spam”. The goal of the exercise is to build a
model to identify messages as either ham or spam.
Techniques used:
1. Naive Bayes Classifier
2. Training and Testing
3. Confusion Matrix
4. Text Pre-Processing
5. Pipelines
# -*- coding: utf-8 -*-

import os

os.chdir("/home/cloudops/spark")
os.curdir

# =====================================
# Load CSV file into a RDD
# =====================================
# 2 partitions
smsData = sc.textFile("data/SMSSpamCollection.csv", 2)
smsData.cache()
smsData.collect()
smsData.count()      # 1000

# =====================================
# Transform to Vector
# 0.0 for ham
# 1.0 for spam
# =====================================

def TransformToVector(inputStr):
    attList = inputStr.split(",")
    smsType = 0.0 if attList[0] == "ham" else 1.0
    return [smsType, attList[1]]

smsXformed = smsData.map(TransformToVector)

# =====================================
# Create DataFrame from Vector
# =====================================
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

smsDf = sqlContext.createDataFrame(smsXformed, ["label","message"])
smsDf.cache()
smsDf.select("label","message").show()
# +-----+--------------------+
# |label|             message|
# +-----+--------------------+
# |  0.0|Go until jurong p...|
# |  0.0|Ok lar... Joking ...|
# |  0.0|U dun say so earl...|

# =====================================
# Split DF to Training and Testing Parts
# =====================================
(trainingData, testData) = smsDf.randomSplit([0.9, 0.1])
trainingData.count()  # 882
testData.count()      # 118
testData.collect()
# . . .
# Row(label=1.0, message='Your B4U voucher w/c 27/03 is MARSMS. ...'),
# Row(label=1.0, message='it to 80488. Your 500 free text messages...')]

# =====================================
# Setup Pipeline
# =====================================
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.feature import IDF

# 1. Tokenizer
tokenizer = Tokenizer(inputCol="message", outputCol="words")

# 2. Split each message to words and build a Text Frequency array
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),
                      outputCol="tempfeatures")

# 3. Build Inverse Document Frequency array
idf = IDF(inputCol=hashingTF.getOutputCol(),
          outputCol="features")

# 4. Model building
nbClassifier = NaiveBayes()

# =====================================
# Pipeline
# =====================================
pipeline = Pipeline(stages=[tokenizer,
                            hashingTF,
                            idf,
                            nbClassifier])

nbModel = pipeline.fit(trainingData)

# All steps in Pipe for the Test Data
prediction = nbModel.transform(testData)

# =================================
# Draw a Confusion Matrix
# =================================
prediction.groupBy("label","prediction").count().show()
# +-----+----------+-----+
# |label|prediction|count|
# +-----+----------+-----+
# |  1.0|       1.0|   56|
# |  0.0|       1.0|    4|
# |  1.0|       0.0|    4|
# |  0.0|       0.0|   54|
# +-----+----------+-----+

# Good level of prediction